[AWS IoT Greengrass V2] コンポーネントでコアデバイス間のPublish/Subscribeを試してみました
1 はじめに
IoT事業部の平内(SIN)です。
前回、AWS IoT Greengrass Core interprocess communication (IPC) を使用して、IoT Coreのメッセージブローカーにアクセスしてみました。
今回は、メッセージブローカーではなく、コアデバイス間でのPub/Subを試してみました。
参考:Publish/subscribe local messages
下記の動画は、作成したサプンルの動作を確認している様子です。左のコンソールは、PublishとSubscribeのコンポーネントのログをtailしているもので、右は開発デバイス上のローカルデバッグコンソールでコンポーネントの再起動などを行なっているものです。
操作している内容は以下のとおりです。
- Subscriberコンポーネントを再起動(メッセージ待機に入る)
- Publisherコンポーネントを再起動(メッセージを3回送って終了)
- Publisherコンポーネントを起動(メッセージを3回送って終了)
2 Publisher
Publishするコンポーネントは、以下の通りです。
(1) artifact
コードです。IoT Coreに対する操作と殆ど同じです。
PublishMessageには、message(バイナリ用)及び、json_message(JSON用)の2種類のオブジェクトがありますが、今回は、JSON用を使用しています。
publisher.py
import time import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, JsonMessage ) TIMEOUT = 10 topic = "topic" print("? local_publisher start.") ipc_client = awsiot.greengrasscoreipc.connect() for i in range(3): message = { "counter": i } request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.json_message = JsonMessage() publish_message.json_message.message = message request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future = operation.get_response() future.result(TIMEOUT) print("? publish :{}".format(message)) time.sleep(1) print("? local_publisher finish.")
(2) recipe
レシピでは、accessControlでaws.greengrass.ipc.pubsubへのaws.greengrass#PublishToTopicが許可されています。
com.example.LocalPublisher-1.0.0.yaml
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.LocalPublisher ComponentVersion: '1.0.0' ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.LocalPublisher:pubsub:1: operations: - "aws.greengrass#PublishToTopic" resources: - "*" Manifests: - Lifecycle: Install: pip3 install awsiotsdk Run: python3 -u {artifacts:path}/publisher.py
(3) 動作確認
コンポーネントのログは、以下のようになります。
※ 確認しやすいように一部編集しています
$ sudo tail -f /greengrass/v2/logs/com.example.LocalPublisher.log com.example.LocalPublisher: stdout. ? local_publisher start.. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING} com.example.LocalPublisher: stdout. ? publish :{'counter': 0}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING} com.example.LocalPublisher: stdout. ? publish :{'counter': 1}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING} com.example.LocalPublisher: stdout. ? publish :{'counter': 2}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING} com.example.LocalPublisher: stdout. ? local_publisher finish.. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
3 Subscriber
サブスクライブ側のコードです。到着したメッセージの内容もJSON用(event.json_message.message)を指定する事に注意が必要です。
(1) artifact
subscriber.py
import time import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage ) TIMEOUT = 10 topic = "topic" print("? local_subscriber start.") ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: print("? payload:{}".format(event.json_message.message)) def on_stream_error(self, error: Exception) -> bool: return True def on_stream_closed(self) -> None: pass request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) future = operation.activate(request) future.result(TIMEOUT) while True: time.sleep(1) operation.close() print("? local_subscriber finish.")
(2) recipe
aws.greengrass.ipc.pubsubで、aws.greengrass#SubscribeToTopicを許可しています。なお、LifecycleのRunで設定するPythonのパラメータに、-u が無いと、whileループでログが上手く確認できないので注意が必要です。
com.example.LocalSubscriber-1.0.0.yaml
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.LocalSubscriber ComponentVersion: '1.0.0' ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.LocalSubscriber:pubsub:1: operations: - "aws.greengrass#SubscribeToTopic" resources: - "*" Manifests: - Lifecycle: Install: pip3 install awsiotsdk Run: | python3 -u {artifacts:path}/subscriber.py
(3) 動作確認
コンポーネントのログでは、メッセージを受信できていることが確認できます。
※ 確認しやすいように一部編集しています
com.example.LocalSubscriber: stdout. ? local_subscriber start.. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING} com.example.LocalSubscriber: stdout. ? payload:{'counter': 0.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING} com.example.LocalSubscriber: stdout. ? payload:{'counter': 1.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING} com.example.LocalSubscriber: stdout. ? payload:{'counter': 2.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING} com.example.LocalSubscriber: stdout. ? payload:{'counter': 0.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING} com.example.LocalSubscriber: stdout. ? payload:{'counter': 1.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING} com.example.LocalSubscriber: stdout. ? payload:{'counter': 2.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
4 最後に
今回は、コアデバイス間でのPub/Subを試してみました。
コード自体が全然別なので、アクセス制御だけで、IoT Coreのメッセージブローカーと共有することはできません。 また、メッセージオブジェクトにバイナリ用とJSON用があって、どちらを使用しているかを意識することが大事だと思いました。
5 参考リンク
[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました